package org.msgcenter.mc.actor;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpStatus;
import cn.hutool.json.JSONUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.mysqlclient.MySQLClient;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import lombok.extern.slf4j.Slf4j;
import org.msgcenter.mc.common.ConvertUtil;
import org.msgcenter.mc.common.JsonResult;
import org.msgcenter.mc.ds.*;

import java.util.ArrayList;
import java.util.List;

@Slf4j
public class MsgDbActor extends AbstractVerticle {

  private MySQLPool pool;

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    super.start(startPromise);

    MySQLConnectOptions connectOptions = new MySQLConnectOptions()
      .setPort(3306)
      .setHost("127.0.0.1")
      .setDatabase("mc")
      .setUser("root")
      .setPassword("H9MvYSqY3JmAC4aj");

    PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

    pool = MySQLPool.pool(vertx, connectOptions, poolOptions);
    EventBus eb = vertx.eventBus();

    onAddRequestMsg(eb);
    onAddSiteMsg(eb);
    onAddCompanyWechatMsg(eb);
    onAddPushState(eb);
    onUpdatePushState(eb);

    onGetTplByCode(eb);
    onGetTplById(eb);
    onGetMsgById(eb);
  }

  private final String addRequestMsgSql = "INSERT INTO `msg_pool` (`msg_id`, `msg_sub_id`, `msg_from`, `msg_type`, `msg_priority`, `push_type`, `channel_type`, `receive_man_id`, `tip_type`, `template_id`, `push_time_str`, `push_time`, `msg_details_url`, `receive_person`, `msg_tag`, `msg_title`, `msg_content`) VALUES " +
    "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

  private Future<RowSet<Row>> addRequestMsg(RequestMsg msg) {
    return pool.getConnection()
      .compose(conn -> conn.preparedQuery(addRequestMsgSql).execute(ConvertUtil.requestMsgToTuple(msg))
        .onFailure(err -> log.error("执行消息入库错误 {}", err.getMessage()))
        .eventually(r -> conn.close()));
  }

  // 增加请求消息
  private void onAddRequestMsg(EventBus eb) {
    MessageConsumer<String> consumer = eb.consumer("db.post.add_request_msg");

    consumer.handler(payload -> {
        RequestMsg requestMsg = JSONUtil.toBean(payload.body(), RequestMsg.class);
        log.info("请求消息入库开始 {}", requestMsg);

        addRequestMsg(requestMsg)
          .onSuccess(r -> {

            Long id = r.property(MySQLClient.LAST_INSERTED_ID);
            InsertCommonResp resp = new InsertCommonResp(id, DateUtil.now());
            JsonResult<String, InsertCommonResp> result = new JsonResult<>(HttpStatus.HTTP_OK, "请求消息入库成功", resp);
            log.info("请求消息入库结束 {}", id);

            payload.reply(JSONUtil.toJsonStr(result));
          });
      })
      .exceptionHandler(err -> log.error("请求消息入库出错 {}", err.getMessage()));
  }

  private final String getMsgByIdSql = "SELECT * FROM `msg_push_state` WHERE msg_id = ?";

  private Future<RowSet<Row>> getMsgById(Long id) {
    return pool.getConnection()
      .compose(conn -> conn.preparedQuery(getMsgByIdSql).execute(Tuple.of(id))
        .onFailure(err -> log.error("获取消息详情失败 {}", err.getMessage()))
        .eventually(r -> conn.close()));
  }

  private void onGetMsgById(EventBus eb) {
    MessageConsumer<String> consumer = eb.consumer("db.get.select_msg_by_id");

    consumer.handler(payload -> {
        Long id = Convert.toLong(payload.body());
        log.info("获取消息详情开始 {}", id);

        getMsgById(id)
          .onSuccess(res -> {
            List<PushStateMsg> data = new ArrayList<>();
            res.forEach(e -> {
              PushStateMsg msg = new PushStateMsg();
              msg.setId(e.getLong("id"));
              msg.setMsgId(e.getLong("msg_id"));
              msg.setMsgSubId(e.getLong("msg_sub_id"));
              msg.setMsgFrom(e.get(Byte.class, "msg_from"));
              msg.setMsgType(e.get(Byte.class, "msg_type"));
              msg.setMsgPriority(e.get(Byte.class, "msg_priority"));
              msg.setPushType(e.get(Byte.class, "push_type"));
              msg.setChannelType(e.getInteger("channel_type"));
              msg.setReceiveManId(e.getString("receive_man_id"));
              msg.setTipType(e.get(Byte.class, "tip_type"));
              msg.setPushTimeStr(e.getString("push_time_str"));
              msg.setPushTime(e.getInteger("push_time"));
              msg.setMsgDetailsUrl(e.getString("msg_details_url"));
              msg.setReceivePerson(e.getString("receive_person"));
              msg.setMsgTag(e.getString("msg_tag"));
              msg.setMsgTitle(e.getString("msg_title"));
              msg.setMsgContent(e.getString("msg_content"));
              data.add(msg);
            });

            log.info("获取消息详情成功 总共：{}", data.size());
            JsonResult<String, PushStateMsg> result = new JsonResult<>(HttpStatus.HTTP_OK, "获取消息详情成功", data.get(0));
            payload.reply(JSONUtil.toJsonStr(result));
          })
          .onFailure(err ->
            payload.reply(JSONUtil.toJsonStr(new JsonResult<>(HttpStatus.HTTP_INTERNAL_ERROR, StrUtil.format("获取消息详情失败 {}", err.getMessage()), null))));
      })
      .exceptionHandler(err -> log.error("获取消息详情出错 {}", err.getMessage()));
  }

  private final String addSiteMsgSql = "INSERT INTO `msg_site` (`msg_id`, `tip_type`, `push_time_str`, `push_time`, `msg_details_url`, `receive_person`, `msg_tag`, `msg_title`, `msg_content`) VALUES " +
    "(?, ?, ?, ?, ?, ?, ?, ?, ?)";

  private Future<RowSet<Row>> addSiteMsg(SiteMsg msg) {
    return pool.getConnection()
      .compose(conn -> conn.preparedQuery(addSiteMsgSql).execute(ConvertUtil.siteMsgToTuple(msg)).eventually(r -> conn.close()));
  }

  // 增加站内信消息
  private void onAddSiteMsg(EventBus eb) {
    MessageConsumer<String> consumer = eb.consumer("db.post.add_site_msg");

    consumer.handler(msg -> {
      SiteMsg siteMsg = JSONUtil.toBean(msg.body(), SiteMsg.class);

      addSiteMsg(siteMsg)
        .onSuccess(r -> {
          log.info("站点消息入库开始 {}", siteMsg);
          Long id = r.property(MySQLClient.LAST_INSERTED_ID);
          InsertCommonResp resp = new InsertCommonResp(id, DateUtil.now());
          JsonResult<String, InsertCommonResp> result = new JsonResult<>(HttpStatus.HTTP_OK, "站点消息入库成功", resp);
          log.info("站点消息入库结束 {}", id);

          msg.reply(JSONUtil.toJsonStr(result));
        })
        .onFailure(err -> {
          JsonResult<String, String> jr = new JsonResult<>(HttpStatus.HTTP_INTERNAL_ERROR, "站点消息入库失败", err.getMessage());
          msg.reply(JSONUtil.toJsonStr(jr));
        });
    });
  }

  // 企业消息
  private final String addCompanyWechatMsgSql = "INSERT INTO `msg_company_wechat` (`msg_id`, `tip_type`, `push_time_str`, `push_time`, `msg_details_url`, `receive_person`, `msg_tag`, `msg_title`, `msg_content`) VALUES " +
    "(?, ?, ?, ?, ?, ?, ?, ?, ?)";

  private Future<RowSet<Row>> addCompanyWechatMsg(CompanyWechatMsg msg) {
    return pool.getConnection()
      .compose(conn -> conn.preparedQuery(addCompanyWechatMsgSql).execute(ConvertUtil.companyWechatMsgToTuple(msg)).eventually(r -> conn.close()));
  }

  // 增加站内信消息
  private void onAddCompanyWechatMsg(EventBus eb) {
    MessageConsumer<String> consumer = eb.consumer("db.post.add_company_wechat_msg");

    consumer.handler(msg -> {
      CompanyWechatMsg cwm = JSONUtil.toBean(msg.body(), CompanyWechatMsg.class);

      addCompanyWechatMsg(cwm)
        .onSuccess(r -> {
          log.info("企业消息入库开始 {}", cwm);
          Long id = r.property(MySQLClient.LAST_INSERTED_ID);
          InsertCommonResp resp = new InsertCommonResp(id, DateUtil.now());
          JsonResult<String, InsertCommonResp> result = new JsonResult<>(HttpStatus.HTTP_OK, "企业消息入库成功", resp);
          log.info("企业消息入库结束 {}", id);

          msg.reply(JSONUtil.toJsonStr(result));
        })
        .onFailure(err -> {
          JsonResult<String, String> jr = new JsonResult<>(HttpStatus.HTTP_INTERNAL_ERROR, "企业消息入库失败", err.getMessage());
          msg.reply(JSONUtil.toJsonStr(jr));
        });
    });
  }

  private final String getTplByCodeSql = "SELECT id,tpl_code,tpl_content,add_time FROM `msg_template` where tpl_code = ?";

  private Future<RowSet<Row>> selectTplByCode(TplRequestMsg msg) {
    return pool.getConnection()
      .compose(conn -> conn.preparedQuery(getTplByCodeSql).execute(Tuple.of(msg.getTplCode())).eventually(r -> conn.close()));
  }

  private void onGetTplByCode(EventBus eb) {
    MessageConsumer<String> consumer = eb.consumer("db.get.select_tpl_by_code");

    consumer.handler(payload -> {
      TplRequestMsg tplRequestMsg = JSONUtil.toBean(payload.body(), TplRequestMsg.class);
      log.info("获取模板内容开始 {}", tplRequestMsg);

      selectTplByCode(tplRequestMsg)
        .onSuccess(r -> {
          if (r.size() == 1) {
            ArrayList<TplDto> list = new ArrayList<>();
            r.forEach(e -> {
              TplDto dto = new TplDto(e.getInteger("id"), e.getString("tpl_code"), e.getString("tpl_content"), e.getLong("add_time"));
              list.add(dto);
            });

            log.info("获取模板内容结束 {}", list);
            JsonResult<String, TplDto> resp = new JsonResult<>(HttpStatus.HTTP_OK, "获取模板内容成功", list.get(0));

            payload.reply(JSONUtil.toJsonStr(resp));
          } else {
            String err = StrUtil.format("获取模板内容出错，没有相关内容: {}", tplRequestMsg);
            JsonResult<String, String> resp = new JsonResult<>(0, err, "");
            log.error(err);

            payload.reply(JSONUtil.toJsonStr(resp));
          }
        });
    });
  }

  private final String getTplByIdSql = "SELECT id,tpl_code,tpl_content,add_time FROM `msg_template` where id = ?";

  private Future<RowSet<Row>> selectTplById(Integer tplId) {
    return pool.getConnection()
      .compose(conn -> conn.preparedQuery(getTplByIdSql).execute(Tuple.of(tplId)).eventually(r -> conn.close()));
  }

  private void onGetTplById(EventBus eb) {
    MessageConsumer<String> consumer = eb.consumer("db.get.select_tpl_by_id");

    consumer.handler(payload -> {
      Integer req = Convert.toInt(payload.body());
      log.info("获取模板内容开始 {}", req);

      selectTplById(req)
        .onSuccess(res -> {
          if (res.size() == 1) {
            ArrayList<TplDto> list = new ArrayList<>();
            res.forEach(e -> {
              TplDto dto = new TplDto(e.getInteger("id"), e.getString("tpl_code"), e.getString("tpl_content"), e.getLong("add_time"));
              list.add(dto);
            });

            log.info("获取模板内容结束 {}", list);
            JsonResult<String, TplDto> resp = new JsonResult<>(HttpStatus.HTTP_OK, "获取模板内容成功", list.get(0));

            payload.reply(JSONUtil.toJsonStr(resp));
          } else {
            String err = StrUtil.format("获取模板内容出错，没有相关内容: {}", req);
            JsonResult<String, TplDto> resp = new JsonResult<>(0, err, null);
            log.error(err);

            payload.reply(JSONUtil.toJsonStr(resp));
          }
        });
    });
  }

  private final String addPushStateSql = "INSERT INTO `msg_push_state` (`msg_id`, `msg_sub_id`, `msg_from`, `msg_type`, `msg_state`, `read_state`, `is_deleted`, `msg_priority`, `push_type`, `channel_type`, `receive_man_id`, `tip_type`, `err_type`, `cb_err_type`, `msg_template_id`, `err_code`, `cb_err_code`, `push_time_str`, `push_time`, `msg_details_url`, `receive_person`, `err_msg`, `cb_err_msg`, `msg_tag`, `msg_title`, `msg_content`) VALUES " +
    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

  private Future<RowSet<Row>> addPushState(PushStateMsg msg) {
    return pool.getConnection()
      .compose(conn -> conn.preparedQuery(addPushStateSql).execute(ConvertUtil.pushStateToTuple(msg)).eventually(r -> conn.close()))
      .onFailure(err -> log.error("推送状态入库错误 {}", err.getMessage()));
  }

  private void onAddPushState(EventBus eb) {
    MessageConsumer<String> consumer = eb.consumer("db.post.add_push_state");

    consumer.handler(payload -> {
      PushStateMsg req = JSONUtil.toBean(payload.body(), PushStateMsg.class);
      log.info("推送状态入库开始 {}", req);

      addPushState(req)
        .onSuccess(r -> {
          Long id = r.property(MySQLClient.LAST_INSERTED_ID);
          InsertCommonResp resp = new InsertCommonResp(id, DateUtil.now());
          JsonResult<String, InsertCommonResp> result = new JsonResult<>(HttpStatus.HTTP_OK, "推送状态入库成功", resp);
          log.info("推送状态入库结束 {}", id);

          payload.reply(JSONUtil.toJsonStr(result));
        })
        .onFailure(err -> {
          log.error("推送状态入库失败 {}", err.getMessage());

          JsonResult<String, PushStateMsg> result = new JsonResult<>(HttpStatus.HTTP_INTERNAL_ERROR, "推送状态入库失败", req);
          payload.reply(JSONUtil.toJsonStr(result));
        });
    });
  }

  private final String updatePushStateSql = "UPDATE msg_push_state SET msg_state = ?, push_time = ?, push_time_str = ? , err_type = ?, err_code = ?, err_msg = ? WHERE msg_id = ?";

  private Future<RowSet<Row>> updatePushState(PushStateMsg msg) {
    return pool.getConnection()
      .compose(conn -> conn.preparedQuery(updatePushStateSql).execute(ConvertUtil.updatePushStateToTuple(msg)).eventually(r -> conn.close()))
      .onFailure(err -> log.error("更新推送状态错误 {}", err.getMessage()));
  }

  private void onUpdatePushState(EventBus eb) {
    MessageConsumer<String> consumer = eb.consumer("db.post.update_push_state");

    consumer.handler(payload -> {
      PushStateMsg req = JSONUtil.toBean(payload.body(), PushStateMsg.class);
      log.info("推送状态入库开始 {}", req);

      updatePushState(req)
        .onSuccess(r -> {
          JsonResult<String, Long> result = new JsonResult<>(HttpStatus.HTTP_OK, "更新推送状态成功", req.getMsgId());
          log.info("更新推送状态成功 {}", req.getMsgId());

          payload.reply(JSONUtil.toJsonStr(result));
        })
        .onFailure(err -> {
          log.error("更新推送状态失败 {}", err.getMessage());

          JsonResult<String, PushStateMsg> result = new JsonResult<>(HttpStatus.HTTP_INTERNAL_ERROR, "更新推送状态失败", req);
          payload.reply(JSONUtil.toJsonStr(result));
        });
    });
  }

  @Override
  public void stop(Promise<Void> stopPromise) throws Exception {
    super.stop(stopPromise);
  }
}
